Skip to content

Conversation

@alessandrobologna
Copy link
Contributor

@alessandrobologna alessandrobologna commented Dec 10, 2025

📬 Issue #: #1065

✍️ Description of changes:

Support Lambda Managed Instances / concurrent runtime (refs #1065)

Note

I originally built this to experiment with Lambda Managed Instances in a test workload and thought it might be a useful starting point for upstream support. Parts of the implementation were drafted with the help of an AI assistant and have only been exercised in my own workloads and the existing test suite so far, so please treat this as a starting point for discussion rather than a final design. A super simple demo/benchmark of this implementation is at https://github.com/alessandrobologna/rust-lmi-demo

This draft PR implements a concurrent runtime mode for Lambda Managed Instances, plus the API changes required in lambda-runtime and lambda-http. It preserves existing single-invocation behavior for classic Lambda. The implementation is based on the AWS guidance for building runtimes that support Lambda Managed Instances, as described in Building custom runtimes for Lambda Managed Instances.

Architecture (high level)

The runtime chooses between a classic "one invocation at a time" loop and a concurrent mode controlled by AWS_LAMBDA_MAX_CONCURRENCY:

  • Sequential mode (default): a single /next long-poll loop fetches one event at a time and invokes the handler synchronously, preserving existing behavior.
  • Concurrent mode (AWS_LAMBDA_MAX_CONCURRENCY >= 2): the runtime spawns AWS_LAMBDA_MAX_CONCURRENCY worker tasks. Each worker runs its own /next long-poll loop and processes invocations sequentially within that task, so overall concurrency is bounded by the number of workers. If a worker crashes, the runtime continues operating with the remaining workers and only terminates if it can't keep at least 1 worker alive.
sequenceDiagram
    participant Lambda as Lambda orchestrator
    participant Runtime as lambda-runtime
    participant Worker as Runtime worker task
    participant API as Runtime API
    participant Handler as User handler

    Lambda->>Runtime: Start process (env AWS_LAMBDA_MAX_CONCURRENCY = N)
    Runtime->>Worker: Spawn N worker tasks
    loop each worker (identical)
        loop each invocation
            Worker->>API: GET /runtime/invocation/next
            API-->>Worker: Invocation event + context headers
            Worker->>Handler: Call handler(event, context)
            Handler-->>Worker: Result or error
            Worker->>API: POST /response or /error
        end
    end
Loading

Breaking changes & compatibility

  • Existing entrypoints (lambda_runtime::run, lambda_http::run, and lambda_http::run_with_streaming_response) keep their original signatures and sequential behavior. Handlers that compiled against the current release should continue to compile unchanged.
  • New concurrent entrypoints are added:
    • lambda_runtime::run_concurrent
    • lambda_http::run_concurrent
    • lambda_http::run_with_streaming_response_concurrent
      These require handler services to implement Clone + Send + 'static (with responses/stream bodies Send/Sync + 'static) so they can be safely cloned and driven by the concurrent runtime.
  • AWS_LAMBDA_MAX_CONCURRENCY is read directly by the runtime to decide concurrency and size the HTTP pool, without adding new public fields to Config (avoids semver breakage for struct literals).
  • In concurrent mode (when the new entrypoints are used and AWS_LAMBDA_MAX_CONCURRENCY > 1), the runtime no longer sets _X_AMZN_TRACE_ID in the process environment. The per-invocation X-Ray trace ID is available via Context::xray_trace_id and tracing spans instead. Sequential mode behavior is unchanged.
  • For the concurrent entrypoints, the behavior when AWS_LAMBDA_MAX_CONCURRENCY is set changes from "always sequential per environment" to "per-environment concurrency up to that value". Code that continues to call the existing run functions will remain strictly sequential even if the env var is set.

In other words: the earlier versions of this branch tightened the bounds on the existing run functions, but after maintainer feedback those entrypoints are left as-is and concurrency is opt-in via the new *_concurrent APIs.

Below is a concise summary of the changes (unfortunately many) by area.

Runtime & Config (lambda-runtime)

  • Read AWS_LAMBDA_MAX_CONCURRENCY directly inside the runtime to decide whether concurrent mode should be enabled and to size the HTTP client pool, without extending Config.
  • Runtime::new now sizes the lambda_runtime_api_client HTTP pool from max_concurrency so the number of idle connections matches expected concurrency.
  • Runtime::run remains the original sequential /next loop via run_with_incoming, preserving existing behavior.
  • New Runtime::run_concurrent spawns AWS_LAMBDA_MAX_CONCURRENCY worker tasks, each running its own /next loop. Sequential and concurrent modes share the same invocation processing helper, so there are not two separate event loop implementations to maintain. When Config::is_concurrent() is false, it falls back to the same sequential loop as Runtime::run.
  • Worker failures are isolated: if a worker exits/panics, the runtime logs the failure and continues with fewer workers. The runtime terminates only if it can't keep at least 1 worker alive (i.e., all workers have exited).
  • Unexpected clean worker exits are treated as infrastructure errors so the runtime returns an error once all workers are gone.
  • Handler errors (user code returning Err) are reported to Lambda via /invocation/{id}/error and do not terminate the runtime.
  • The existing graceful shutdown handler stays aligned with upstream. We do not attempt to preempt/coordinate in-flight invocations on SIGTERM (the expectation is that Lambda only shuts down environments when they are idle; local emulators may behave differently).
  • Internal layers (api_client, api_response, trace) now implement/derive Clone so they can be composed into a cloneable service stack for the concurrent entrypoints.
  • Context::new is more robust when lambda-runtime-client-context / lambda-runtime-cognito-identity headers are present but empty (treated as None instead of failing JSON parse).
  • Added a regression test (concurrent_worker_crash_does_not_stop_other_workers) to verify worker isolation behavior.

HTTP & streaming (lambda-http, lambda-runtime-api-client)

  • lambda_runtime_api_client::Client:
    • Added with_pool_size(usize) on the builder and threads a pool_size: Option<usize> into the Hyper client to set pool_max_idle_per_host.
    • Still works as before when pool_size is not provided.
  • lambda_http::run and run_with_streaming_response keep their existing signatures and sequential behavior, delegating to lambda_runtime::run.
  • New lambda_http::run_concurrent and lambda_http::run_with_streaming_response_concurrent wrap the same handler types but require them to be Clone + Send + 'static (with response/stream bounds aligned to lambda_runtime::run_concurrent) so they can be driven by the concurrent runtime.
  • HTTP adapters (Adapter, StreamAdapter) are now Clone when the inner service is Clone, and the streaming path uses BoxCloneService internally for the concurrent entrypoint so the composed service stack can be cloned.
    • Note: run_with_streaming_response_concurrent is exported from lambda_http, similarly to run_with_streaming_response.

Tooling & examples

  • Makefile and scripts/test-rie.sh:
    • Add RIE_MAX_CONCURRENCY and a test-rie-lmi target that runs RIE with AWS_LAMBDA_MAX_CONCURRENCY set, making it easy to exercise managed-instances behavior locally.
  • examples/basic-lambda-concurrent:
    • New minimal example that calls lambda_runtime::run_concurrent so the concurrent code path can be exercised under RIE/LMI.
  • examples/basic-lambda/src/main.rs:
    • Wraps lambda_runtime::run(func).await in an if let Err(err) block to log and propagate runtime errors when testing under RIE.

Validation

On this branch, I ran:

  • cargo +nightly fmt --all -- --check
  • cargo clippy --workspace --all-features -- -D warnings
  • cargo +stable test --all-features -p lambda_runtime_api_client -p lambda_runtime -p lambda_http
  • make test-rie EXAMPLE=basic-lambda
  • make test-rie-lmi EXAMPLE=basic-lambda-concurrent (sets AWS_LAMBDA_MAX_CONCURRENCY inside the RIE container)

If maintainers prefer, this could be split into smaller PRs (e.g., builder/Config prep, handler Clone changes, and finally the concurrent runtime), but this branch shows the full "end-to-end" implementation so that it can be tested with Lambda Managed Instances.


🔏 By submitting this pull request

  • I confirm that I've ran cargo +nightly fmt --all -- --check.
  • I confirm that I've ran cargo clippy --workspace --all-features -- -D warnings.
  • I confirm that I've made a best effort attempt to update all relevant documentation.
  • I confirm that my contribution is made under the terms of the Apache 2.0 license.

@alessandrobologna alessandrobologna marked this pull request as draft December 10, 2025 07:20
@alessandrobologna alessandrobologna marked this pull request as ready for review December 12, 2025 06:18
@alessandrobologna alessandrobologna force-pushed the feat/concurrent-lambda-runtime branch 2 times, most recently from 8a991a7 to db82e4b Compare December 18, 2025 03:43
@alessandrobologna
Copy link
Contributor Author

Thanks @bnusunny for the approval!
Currently CI is failing on the semver check because Config has a new public field. I have a small follow‑up that keeps Config unchanged by moving the concurrency limit into Runtime (derived from AWS_LAMBDA_MAX_CONCURRENCY), which avoids a major version bump.

Patch: alessandrobologna@a8af68a

If you’re OK with that approach, I can push it directly to this PR branch so the checks go green.

- Add Config.max_concurrency and size runtime HTTP client pool from AWS_LAMBDA_MAX_CONCURRENCY.

- Introduce windowed concurrent /next polling with semaphore-limited handler tasks and shutdown coordination.

- Require Clone + Send + 'static handlers in lambda-runtime and lambda-http, and make internal layers/HTTP adapters cloneable.

- Adjust streaming HTTP to use BoxCloneService and align bounds for concurrent execution.

- Add RIE LMI helper (Makefile + test-rie.sh) and minor robustness improvements (Context parsing, basic example error logging).

Tests: cargo +stable fmt --all; cargo +stable clippy --all-targets --all-features; cargo +stable test --all (integration test requiring TEST_ENDPOINT not configured); ./scripts/test-rie.sh basic-lambda
- Re-export run_with_streaming_response_concurrent from lambda_http
- Remove now-unneeded dead_code allows in streaming helpers
- Cfg-gate Duration import behind graceful-shutdown
- Replace windowed polling + per-invocation tasks with N worker tasks, each
  running its own /next loop (AWS_LAMBDA_MAX_CONCURRENCY).
- Share invocation processing between sequential and concurrent paths.
- Align graceful shutdown behavior with upstream (run hook then exit).
- Add basic-lambda-concurrent example and update RIE helper script.
- Clarify run_concurrent docs to describe the worker model.
- remove shutdown signaling so worker failures stay isolated
- track remaining workers and return first infra error only when none remain
- treat unexpected clean worker exits as infra errors
- add deterministic worker-isolation regression test
- align concurrent HTTP doc wording with worker-loop model
- update concurrent worker exit warning per review nit
- drop redundant .cloned() in ApiGatewayV2 cookies mapping
  (clippy lint was pre-existing and unrelated to this PR)
@alessandrobologna alessandrobologna force-pushed the feat/concurrent-lambda-runtime branch from d54058c to f4a7612 Compare December 20, 2025 04:14
@bnusunny
Copy link
Collaborator

Yes, please.

- drop Config.max_concurrency to avoid semver breakage
- add Runtime.concurrency_limit derived from AWS_LAMBDA_MAX_CONCURRENCY
- size the API client pool to match the up-front worker allocation
- update tests and Runtime literals accordingly
@alessandrobologna
Copy link
Contributor Author

Ok, I've pushed the update to keep concurrency settings internal and to avoid the semver bump (so no Config public field needed). I also rebased the branch on current main.

- Replace env-var based X-Ray header injection with Context.xray_trace_id
- Set x-amzn-trace-id in HTTP and streaming adapters before handler call
- Remove legacy update_xray_trace_id_header helper from request parsing

Tests:
- cargo +nightly fmt --all
- cargo clippy --workspace --all-features -- -D warnings
- cargo +stable test --all-features -p lambda_runtime_api_client -p lambda_runtime -p lambda_http
Copy link
Collaborator

@jlizen jlizen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall implementation looks great!

To summarize my feedback:

  • I think naming things based on _concurrent is overly limiting limiting if we expect future features to require Clone, i'd suggest _extended instead maybe? And we should probably nudge callers to use _extended, _concurrent, _maybe_concurrent, etc, unless they specifically need !Clone bounds?
  • The non-*_concurrent should have doc comments warning that it ignores concurrency variables and probably an error log or panic if the caller tries to invoke them with concurrency ENV set
  • I'd like to make it easier to trace per-worker issues by injecting task IDs into both worker loop spans and worker loop results
  • I'd prefer to preserve all errors across workers rather than only the first. Also it seems like we don't really need the full-batteries FuturesUnordered instead of join_all?
  • I don't like the Oncelock that allows showing a single trace message, it seems like it could be simplified?

./scripts/test-rie.sh $(EXAMPLE)

# Run RIE in Lambda Managed Instance (LMI) mode with concurrent polling.
test-rie-lmi:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: is this makefile target + configurable variable worth mentioning in the README?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thank you. I added a short note in the README under RIE testing showing make test-rie-lmi EXAMPLE=basic-lambda-concurrent

/// `/next` polling loop. When the environment variable is unset or `<= 1`,
/// it falls back to the same sequential behavior as [`run`], so the same
/// handler can run on both classic Lambda and Lambda Managed Instances.
pub async fn run_concurrent<R, S, E>(handler: S) -> Result<(), Error>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Question, not sure): If the fallback case if AWS_LAMBDA_MAX_CONCURRENCY if unset is just lambda_runtime::run(), should this be our new suggested entrypoint, with fn run() marked as deprecated? I guess the main issue would be the Clone bounds are more restrictive, so maybe deprecation is a bit aggressive, but we could add that in the run() doc comments?

The regular run() should definitely also add a warning that it WON'T respect AWS_LAMBDA_MAX_CONCURRENCY.

Context: My suspicion is, most consumers don't mind the Clone bound, and the 'principal of least surprise' would be for it to respect the ENV. So we probably should be pointing people to prefer this method where possible.

I would suggest at minimum the following:

  • probably this should be named run_maybe_concurrent? Or maybe run_extended to avoid a combinatorial explosion of new entrypoint names as featureset grows that needs Clone?
  • We should have a guard clause in lambda_runtime::run() that throws either either crashes the lambda runtime (preferred), or emits error logs (more conservative), if AWS_LAMBDA_MAX_CONCURRENCY is set, since that is clearly a misconfiguration that would prove confusing to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added a doc warning on run() and a fail‑fast guard when AWS_LAMBDA_MAX_CONCURRENCY is set. I kept the *_concurrent naming for now to limit "code churn", and also because _extended seems to me a little vague. But I am open to change it if there's consensus on renaming all of them to what you suggested.
I have also added a note to the run() docstring to make it more clean that run_concurrent is preferred if the handler can satisfy Clone

// Replaces update_xray_trace_id_header (env var), now set from Context
fn update_xray_trace_id_header_from_context(headers: &mut http::HeaderMap, context: &Context) {
if let Some(trace_id) = context.xray_trace_id.as_deref() {
if let Ok(header_value) = http::HeaderValue::from_str(trace_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: What are the cases where http::HeaderValue::from_str() would fail? Should we have a warn or error-level log line if it does?

(I guess this is an existing behavior and this is more of a lift and shift?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this was the existing behavior. It could fail if the xray_trace_id contains invalid header characters, but it should always be valid so I didn't add logging to avoid noise. We could add a debug level log though. Let me know.

/// Builds a streaming-aware Tower service from a `Service<Request>` that can be
/// cloned and sent across tasks. This is used by the concurrent HTTP entrypoint.
#[allow(clippy::type_complexity)]
fn into_stream_service_boxed<S, B, E>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: Do we actually need the extra allocation + indirection of BoxCloneServiceHere? Anyway we aren't leaking into the public API any hideous generic output type from this, so we could probably get by with pure generics?

IE, we should be able to use similar bounds with MapRequest and MapResponse to into_stream_service, no? Just add the Clone bound and call it into_stream_service_cloneable?

Non-blocking since it's a fairly minor perf optimization, and we can change it in the future without semver impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very true! :) I dropped BoxCloneService and switched the concurrent streaming path to a cloneable MapRequest/MapResponse pipeline (similar to into_stream_service) so now it returns a generic cloneable service without the extra allocation/indirection. Thank you for pointing that out.

}
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like THIS is the place where we want to capture whether ALL workers exited cleanly (though that would be a bit tricky to capture in current impl).

The nicer way, might be to model the collected errors as:

enum WorkerError {
    CleanExit(Id),
    Failure((Id, BoxError))
}

And that way we could flatten down to into either a BoxError representing "all workers exited cleanly" versus "these workers exited cleanly, and these ones had these errors".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you for the suggestion. I modeled this as an explicit enum (WorkerError::CleanExit(Id) / WorkerError::Failure(Id, BoxError)) as per your suggestion, and aggregate those into a single error. The formatter now distinguishes ‘all clean’ vs ‘clean + failures’

}
}

match first_error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current impl, None should essentially never be reachable, right? Since even if we have no task-specific errors, we instead create an error out of a clean exit?

If the desired API is to return Ok(()) on all workers cleanly exiting, then we could use the enum mentioned above to gather that info, and emit an error log line if all exited cleanly, but then drop the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am now using the WorkerError and aggregate everything.

For the other point, I currently treat all workers exiting (clean or not) as an error because then the runtime has no workers left and can't serve any more invocations. Returning Ok(()) here would make the runtime exit quietly and hide the failure. But I can change it if you prefer.


let mut workers = FuturesUnordered::new();
for _ in 1..limit {
workers.push(tokio::spawn(concurrent_worker_loop(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth instrumenting this future with a span containing the taskId? Seems pretty useful for debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have added a span on the worker loop with the Tokio task id and instrumented the per-invocation processing with it.

let context = Context::new(invoke_request_id(&parts.headers)?, config.clone(), &parts.headers)?;
let invocation = LambdaInvocation { parts, body, context };

if set_amzn_trace_env {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't love the extra branching here since this is pure overhead for the non-concurrent case. But, I guess branch prediction will probably make this pointless to optimize. Don't think it is worth a bunch of code duplication for.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(No action needed here, unless you feel particularly inspired, fine to resolve this)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the XRAY_LOGGED.get_or_init branch and now only call amzn_trace_env() when set_amzn_trace_env is true. The concurrent message is logged once at startup, as per your suggestion, so the per‑invocation path is simpler

amzn_trace_env(&invocation.context);
} else {
// Inform users that X-Ray is available via context, not env var, in concurrent mode.
XRAY_LOGGED.get_or_init(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OnceLock + get_or_init every invocation seems like unnecessary overhead / complexity. Why can't we just display this message once, at the start of run_concurrent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

- aggregate worker exits with task IDs and surface clean exits in errors
- add per-worker spans and log unexpected exits at error level
- guard run() when AWS_LAMBDA_MAX_CONCURRENCY is set; docs prefer run_concurrent
- make streaming adapter cloneable for concurrent HTTP path
- document pool size guidance and LMI test target; log example via tracing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants